package com.gitee.jastee.kafka.producer;

import com.gitee.jastee.util.ParameterTool;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

import static com.gitee.jastee.kafka.constant.KafkaConstants.KAFKA_PREFIX;

/**
 * @date 2022/03/3 下午1:31
 * @description Kafka生产者连接
 * @author Jast
 */
public class KafkaProducerClient {

    private static Producer<String, String> producer;

    public KafkaProducerClient(ParameterTool parameterTool) {

        Properties props = new Properties();
        /** Kafka集群地址 */
        props.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                parameterTool.getRequired(KAFKA_PREFIX + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        /**
         * producer需要server接收到数据之后发出的确认接收的信号，此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项：
         * （1）acks=0： 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket
         * buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据，同时重试配置不会发生作用（因为客户端不知道是否失败）回馈的offset会总是设置为-1；
         * （2）acks=1：
         * 这意味着至少要等待leader已经成功将数据写入本地log，但是并没有等待所有follower是否成功写入。这种情况下，如果follower没有成功备份数据，而此时leader又挂掉，则消息会丢失。
         * （3）acks=all： 这意味着leader需要等待所有备份都成功写入日志，这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
         * （4）其他的设置，例如acks=2也是可以的，这将需要给定的acks数量，但是这种策略一般很少用。
         */
        props.put(
                ProducerConfig.ACKS_CONFIG,
                parameterTool.get(KAFKA_PREFIX + ProducerConfig.ACKS_CONFIG, "all"));
        /**
         * 设置大于0的值将使客户端重新发送任何数据，一旦这些数据发送失败。注意，这些重试与客户端接收到发送错误时的重试没有什么不同。
         * 允许重试将潜在的改变数据的顺序，如果这两个消息记录都是发送到同一个partition，则第一个消息失败第二个发送成功，则第二条消息会比第一条消息出现要早。
         */
        props.put(
                ProducerConfig.RETRIES_CONFIG,
                parameterTool.getInt(KAFKA_PREFIX + ProducerConfig.RETRIES_CONFIG, 5));
        /** kafka会默认将发送到一个partiton的数据进行整合，这个大小是处理请求数据大小batch发送的，如果太小，可能就只能单独请求发送消息给kafka。 */
        props.put(
                ProducerConfig.BATCH_SIZE_CONFIG,
                parameterTool.getInt(KAFKA_PREFIX + ProducerConfig.BATCH_SIZE_CONFIG, 16384));
        /** 如果达不到batch.size大小，每隔固定时间发送一次 */
        props.put(
                ProducerConfig.LINGER_MS_CONFIG,
                parameterTool.getInt(KAFKA_PREFIX + ProducerConfig.LINGER_MS_CONFIG, 1));
        /**
         * producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度，producer会阻塞或者抛出异常，以“block.on.buffer.full”来表明。
         * 这项设置将和producer能够使用的总内存相关，但并不是一个硬性的限制，因为不是producer使用的所有内存都是用于缓存。
         * 一些额外的内存会用于压缩（如果引入压缩机制），同样还有一些用于维护请求。
         */
        props.put(
                ProducerConfig.BUFFER_MEMORY_CONFIG,
                parameterTool.getInt(KAFKA_PREFIX + ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432));
        /** 发送消息最大大小，理论上应与服务端一致 */
        props.put(
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
                parameterTool.getInt(
                        KAFKA_PREFIX + ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10 * 1024 * 1024));
        /** key的解析序列化接口实现类 */
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        /** value的解析序列化接口实现类 */
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // props.put("connections.max.idle.ms", "5000");
        // props.put("group.max.session.timeout.ms", "5000");
        producer = new KafkaProducer<String, String>(props);
    }

    public Producer<String, String> getProducer() {
        return producer;
    }
}
